package com.ocss.ocsscustomerserver.custom;


import com.alibaba.fastjson.JSONObject;
import com.ocss.ocsscommon.message.ClientMessage;
import com.ocss.ocsscommon.message.KafkaMessage;
//import com.ocss.ocsscommon.message.MessageFormat;
import com.ocss.ocsscommon.message.KafkaVisitorAssignMessage;
import com.ocss.ocsscustomerserver.tools.AllocatePost;
import com.ocss.ocsscustomerserver.tools.MessagePost;
import com.ocss.ocsscustomerserver.tools.WebsocketRecorder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.IOException;

/**
 * @Auther: lijiang
 * @Date: 2023-04-17 20:06
 * @Description: MessageCustomer
 */

@Component
public class KafkaConsumer {

    @Autowired
    AllocatePost allocatePost;


    @Autowired
    MessagePost messagePost;
    // 消费监听
    @KafkaListener(id = "consumer1",groupId = "my-group1", topics = {"visitorSend"})
    public void listen1(String data) throws IOException {
//        System.out.println(data);
       KafkaMessage kafkaMessage =JSONObject.parseObject(data, KafkaMessage.class);
        messagePost.sendMessage(kafkaMessage);
    }


    @KafkaListener(id = "consumer2",groupId = "my-group1", topics = {"assignCustomer"})
    public void listenEstablish(String data) throws IOException {
        System.out.println(data);
        KafkaVisitorAssignMessage kafkaMessage =JSONObject.parseObject(data, KafkaVisitorAssignMessage.class);

        System.out.println(kafkaMessage);
        if (kafkaMessage.getCommendType() == 4000) {
            allocatePost.Allocate(kafkaMessage);
        }else  if (kafkaMessage.getCommendType() == 4002){
            messagePost.informSessionExit (kafkaMessage);
        }

    }


}


